(ns metabase-enterprise.semantic-search.index
  (:require
   [buddy.core.codecs :as buddy-codecs]
   [buddy.core.hash :as buddy-hash]
   [clojure.string :as str]
   [com.climate.claypoole :as cp]
   [honey.sql :as sql]
   [honey.sql.helpers :as sql.helpers]
   [java-time.api :as t]
   ;; TODO: extract schema code to go under db.migration
   [metabase-enterprise.semantic-search.embedding :as embedding]
   [metabase-enterprise.semantic-search.scoring :as scoring]
   [metabase-enterprise.semantic-search.settings :as semantic-settings]
   [metabase.analytics.core :as analytics]
   [metabase.models.interface :as mi]
   [metabase.search.config :as search.config]
   [metabase.search.core :as search]
   [metabase.util :as u]
   [metabase.util.json :as json]
   [metabase.util.log :as log]
   [next.jdbc :as jdbc]
   [next.jdbc.result-set :as jdbc.rs]
   [toucan2.core :as t2])
  (:import
   [java.time Instant LocalDate OffsetDateTime ZonedDateTime]
   [java.util.concurrent ArrayBlockingQueue RejectedExecutionHandler RejectedExecutionException TimeUnit ThreadPoolExecutor]
   [org.postgresql.util PGobject]))

(set! *warn-on-reflection* true)

(comment
  ((requiring-resolve 'metabase-enterprise.semantic-search.db.datasource/init-db!))
  (def db @@(requiring-resolve 'metabase-enterprise.semantic-search.db.datasource/data-source)))

(defn sql-format-quoted
  "Call [[sql/format]] with {:quoted true}.

  Ensures identifiers are quoted since the nano-ids used in temp table names when testing (and various other places)
  might contain uppercase chars or hyphens which need to be quoted."
  [honey-sql & {:as opts}]
  (sql/format honey-sql (merge opts {:quoted true})))

(def ^:dynamic *batch-size*
  "The number of documents to process per batch when updating the index."
  150)

(defn- index-table-schema
  "Schema for the index table."
  [vector-dimensions]
  ;; Generate unique constraint name to avoid index name conflicts between multiple test tables
  (let [unique-constraint-name (keyword (str "unique_constraint_" (u/generate-nano-id) "_model_model_id"))]
    [[:id :bigint [:primary-key] [:raw "GENERATED BY DEFAULT AS IDENTITY"]]
     [:model :text :not-null]
     [:model_id :text :not-null]
     [:collection_id :int]
     [:creator_id :int]
     [:database_id :int]
     [:last_editor_id :int]
     [:name :text :not-null]
     [:content :text :not-null]
     [:display_type :text]
     [:archived :boolean [:default false]]
     [:official_collection :boolean]
     [:pinned :boolean]
     [:verified :boolean]
     [:dashboardcard_count :int]
     [:view_count :int]
     [:created_at :timestamp-with-time-zone [:default [:raw "CURRENT_TIMESTAMP"]] :not-null]
     [:model_created_at :timestamp-with-time-zone]
     [:model_updated_at :timestamp-with-time-zone]
     [:last_viewed_at :timestamp-with-time-zone]
     [:legacy_input :jsonb]
     [:metadata :jsonb]
     [:text_search_vector :tsvector :not-null]
     [:text_search_with_native_query_vector :tsvector :not-null]
     [:embedding [:raw (format "vector(%d)" vector-dimensions)] :not-null]
     [[:constraint unique-constraint-name]
      [:unique [:composite :model :model_id]]]]))

(defn- format-embedding
  "Formats and validates the embedding vector for SQL insertion."
  [embedding]
  ;; Validate all values are numeric as a safe-guard against malformed/malicious embeddings
  (doseq [value embedding]
    (when-not (number? value)
      (throw (ex-info "Embedding contains non-numeric value"
                      {:invalid-value value
                       :embedding embedding}))))
  (str "'[" (str/join ", " embedding) "]'::vector"))

(defn- to-instant
  [document-timestamp]
  (cond
    ;; loosey-goosey, but applies to json encoded instants from the gate table
    ;; important: assumes all observable timestamps are encoded in an Instant friendly way (ISO8601)
    (string? document-timestamp)
    (Instant/parse document-timestamp)

    (instance? OffsetDateTime document-timestamp)
    (.toInstant ^OffsetDateTime document-timestamp)

    (instance? ZonedDateTime document-timestamp)
    (.toInstant ^ZonedDateTime document-timestamp)

    :else (Instant/ofEpochMilli (inst-ms document-timestamp))))

(defn- to-boolean
  "MySQL booleans are represented as 0/1, so we must ensure we're casting them to
   real booleans when inserting them into our postgres db"
  [b]
  {:pre [(some? b)]}
  (cond
    (boolean? b) b
    (= 0 b) false
    (= 1 b) true
    :else (throw (ex-info "Unexpected boolean value" {:v b}))))

(defn- doc->db-record
  "Convert a document to a database record with a provided embedding."
  [embedding-vec {:keys [model id searchable_text embeddable_text native_query created_at creator_id updated_at
                         last_editor_id archived verified official_collection database_id collection_id display_type legacy_input
                         pinned dashboardcard_count view_count last_viewed_at] :as doc}]
  {:model               model
   :model_id            id
   :collection_id       collection_id
   :creator_id          creator_id
   :database_id         database_id
   :last_editor_id      last_editor_id
   :name                (or (:name doc) "")
   :content             embeddable_text
   :display_type        display_type
   :archived            (some-> archived to-boolean)
   :official_collection (some-> official_collection to-boolean)
   :pinned              (some-> pinned to-boolean)
   :verified            (some-> verified to-boolean)
   :dashboardcard_count dashboardcard_count
   :view_count          view_count
   :model_created_at    (some-> created_at to-instant)
   :model_updated_at    (some-> updated_at to-instant)
   :last_viewed_at      (some-> last_viewed_at to-instant)
   :legacy_input        [:cast (json/encode legacy_input) :jsonb]
   :metadata            [:cast (json/encode doc) :jsonb]
   :embedding           [:raw (format-embedding embedding-vec)]
   :text_search_vector  (if (:name doc)
                          [:||
                           (search/weighted-tsvector "A" (:name doc))
                           (search/weighted-tsvector "B" (or searchable_text ""))]
                          (search/weighted-tsvector "A" (or searchable_text "")))
   :text_search_with_native_query_vector
   (if (:name doc)
     [:||
      (search/weighted-tsvector "A" (:name doc))
      (search/weighted-tsvector "B"
                                (str/join " " (remove str/blank? [(or searchable_text "")
                                                                  (or native_query "")])))]
     (search/weighted-tsvector "A"
                               (str/join " " (remove str/blank? [(or searchable_text "")
                                                                 (or native_query "")]))))})

(defn index-size
  "Fetches the number of documents in the index table."
  [connectable table-name]
  (->> (jdbc/execute-one! connectable
                          (-> (sql.helpers/select [:%count.* :count])
                              (sql.helpers/from (keyword table-name))
                              (sql.helpers/limit 1)
                              sql-format-quoted))
       :count))

(defn- analytics-set-index-size!
  "Set the semantic-index-size metric to the number of rows in the index table."
  [connectable table-name]
  (try
    (->> (index-size connectable table-name)
         (analytics/set! :metabase-search/semantic-index-size))
    (catch Exception e
      (log/warn e "Failed to set :metabase-search/semantic-index-size metric"))))

(defn- batch-update!
  [connectable table-name records->sql documents embeddings]
  (when (seq documents)
    (u/prog1 (->> documents (map :model) frequencies)
      (u/profile (str "Semantic index database update of " (count documents) " documents " <>)
        (doseq [batch (->> (map vector documents embeddings)
                           (map (fn [[doc embedding]] (doc->db-record embedding doc)))
                           (partition-all *batch-size*))]
          (jdbc/execute! connectable (records->sql batch)))
        (analytics-set-index-size! connectable table-name)))))

(defn- execute-with-counts [connectable model ids sql]
  (jdbc/execute! connectable sql)
  (u/prog1 (count ids)
    (log/debug "semantic search deleted a batch of" <>
               "documents with model type" model)))

(defn- batch-delete-ids!
  [connectable table-name model ids->sql ids]
  (let [deleted (transduce
                 (comp (partition-all *batch-size*)
                       (map (fn [ids]
                              (-> ids
                                  (some->> ids->sql (execute-with-counts connectable model ids))
                                  (or 0)))))
                 +
                 ids)]
    (when (pos? deleted)
      (log/info "semantic search deleted" deleted "total documents with model type" model)
      (analytics-set-index-size! connectable table-name)
      {model deleted})))

(defn- db-records->update-set
  [db-records]
  (let [update-keys (-> db-records first (dissoc :id :model :model_id) keys)
        excluded-kw (fn [column] (keyword (str "excluded." (name column))))]
    (zipmap update-keys (map excluded-kw update-keys))))

(defn hash-identifier-if-exceeds-pg-limit
  "Sometimes we need to generate new table/index names, such as when forcing a new index despite no change in index parameters.
  When we do so we need to be sure any index names do not exceed the postgres limit for names. This function will hash the identifier
  if it exceeds the length, and will get a name like index_${sha1} instead.

  Note: The index parameters will still be available in index_metadata"
  [identifier]
  (if (<= (count identifier) 63)
    identifier
    (let [hashed-name (str "index_" (buddy-codecs/bytes->hex (buddy-hash/sha1 identifier)))]
      (log/warnf "Using hashed name for index table %s as original table name %s exceeded the maximum table name length" hashed-name identifier)
      hashed-name)))

(defn model-table-suffix
  "Returns a new suffix for a table name, based on current timestamp"
  []
  (mod (.toEpochSecond (t/offset-date-time)) 10000000))

(defn model-table-name
  "Returns a default table name for a model. If the table name would exceed the 63 byte postgres limit, a hashed name is preferred."
  [embedding-model]
  (let [{:keys [model-name provider vector-dimensions]} embedding-model
        provider-name (embedding/abbrev-provider-name provider)
        abbrev-model-name (embedding/abbrev-model-name model-name)
        ideal-table-name (str "index_" provider-name "_" abbrev-model-name "_" vector-dimensions)]
    (hash-identifier-if-exceeds-pg-limit ideal-table-name)))

(defn default-index
  "Returns the default index spec for a model."
  [embedding-model & {:keys [table-name]}]
  (let [table-name (or table-name (model-table-name embedding-model))]
    {:embedding-model embedding-model
     :table-name table-name
     :version 1}))

(defn- upsert-embedding!-fn [connectable index text->docs]
  (fn [text->embedding]
    (let [batch-documents
          (mapcat (fn [[text embedding]]
                    (if embedding
                      (map #(assoc % :embedding embedding) (get text->docs text))
                      (when-let [docs (get text->docs text)]
                        (log/warn "No embedding found for" (count docs) "documents with searchable text:"
                                  {:searchable_text text
                                   :document_count (count docs)}))))
                  text->embedding)]
      (batch-update!
       connectable
       (:table-name index)
       (fn [db-records]
         (-> (sql.helpers/insert-into (keyword (:table-name index)))
             (sql.helpers/values db-records)
             (sql.helpers/on-conflict :model :model_id)
             (sql.helpers/do-update-set (db-records->update-set db-records))
             sql-format-quoted))
       batch-documents
       (map :embedding batch-documents)))))

(defn- unwrap-pgobject
  [^PGobject obj]
  (.getValue ^PGobject obj))

(defn- decode-pgobject
  "Decode a PGObject (returned from a jsonb field) into a Clojure map."
  [^PGobject obj]
  (json/decode (unwrap-pgobject obj) true))

(defn- existing-embedding-query [index texts]
  (-> (sql.helpers/select-distinct-on [:content] :content :embedding)
      (sql.helpers/from (keyword (:table-name index)))
      (sql.helpers/where [:in :content texts])))

(defn- partition-existing-embeddings [connectable index texts]
  (let [found-embeddings
        (->> (jdbc/execute! connectable
                            (sql-format-quoted (existing-embedding-query index texts))
                            {:builder-fn jdbc.rs/as-unqualified-lower-maps})
             (into {} (map (fn [{:keys [content embedding]}]
                             [content (decode-pgobject embedding)]))))]
    [(remove found-embeddings texts) found-embeddings]))

(defn- upsert-index-batch!
  [connectable index documents & {:as opts}]
  (when (seq documents)
    (let [text->docs        (group-by :embeddable_text documents)
          embeddable-texts  (keys text->docs)
          upsert-embedding! (upsert-embedding!-fn connectable index text->docs)
          [new-texts stats]
          (u/profile (str "Semantic search embedding caching attempt for " {:docs (count documents) :texts (count embeddable-texts)})
            (let [[new-texts existing-embeddings] (partition-existing-embeddings connectable index embeddable-texts)]
              (if-not (seq existing-embeddings)
                [embeddable-texts nil]
                (u/profile (str "Semantic search cached embedding db update for " {:texts (count existing-embeddings)})
                  [new-texts (upsert-embedding! existing-embeddings)]))))]
      (->>
       (when (seq new-texts)
         (u/profile (str "Semantic search embedding generation and db update for " {:docs (count documents) :texts (count new-texts)})
           (embedding/process-embeddings-streaming
            (:embedding-model index)
            new-texts
            upsert-embedding!
            opts)))
       (merge-with + stats)))))

(def ^:private ^:dynamic *retrying* false)

(defonce ^:private
  ^{:doc "A shared thread pool for processing batched documents, including fetching embeddings. A custom retry handler
    is used to block the caller thread until a thread is available, to prevent realizing documents until they can be
    processed."}
  index-update-executor
  (delay
    (let [n (long (semantic-settings/index-update-thread-count))
          retry-handler (reify RejectedExecutionHandler
                          (^void rejectedExecution [_ ^Runnable task ^ThreadPoolExecutor executor]
                            (if *retrying*
                              (throw (RejectedExecutionException.))
                              (loop [attempt 0]
                                (let [op
                                      (if (.isShutdown executor)
                                        (.run task)
                                        (try
                                          (binding [*retrying* true]
                                            (.execute executor task))
                                          (catch RejectedExecutionException _ ::retry)))]
                                  (case op
                                    ::retry
                                    (let [delay-ms (min 500 (* 10 (Math/pow 2 attempt)))]
                                      (Thread/sleep (long delay-ms))
                                      (recur (inc attempt)))
                                    nil))))))]
      (ThreadPoolExecutor. n n 0 TimeUnit/SECONDS
                           (ArrayBlockingQueue. n)
                           retry-handler))))

(defn upsert-index-pooled!
  "Returns a future which upserts the provided documents into the index table, executed using the provided thread pool."
  [pool connectable index documents & {:as opts}]
  (cp/future pool (upsert-index-batch! connectable index documents opts)))

(defn upsert-index!
  "Inserts or updates documents in the index table. If a document with the same
  model + model_id already exists, it will be replaced. Parallelizes batch insertion
  using a shared thread pool with a configurable thread count (default: 2)."
  ([connectable index documents-reducible & {:keys [serial?] :or {serial? false}}]
   (not-empty
    (let [pool @index-update-executor
          results (transduce
                   (comp (partition-all *batch-size*)
                         (map (if serial?
                                #(upsert-index-batch! connectable index % {:type :index})
                                #(upsert-index-pooled! pool connectable index % {:type :index}))))
                   conj
                   documents-reducible)]
      (reduce (fn [update-counts result]
                (let [value (if (future? result) @result result)]
                  (merge-with + update-counts (when value value))))
              {}
              results)))))

(defn- drop-index-table-sql
  [{:keys [table-name]}]
  (-> (sql.helpers/drop-table :if-exists (keyword table-name))
      sql-format-quoted))

(defn drop-index-table!
  "Drops the index table for the given embedding model if it exists."
  [connectable index]
  (jdbc/execute! connectable (drop-index-table-sql index)))

;; We can't use full column names in the various index names, because otherwise we overflow postgres' max name length.
;; NOTE If you add a new index, add it to index-embedding-name-length-test as well
(defn- index-name
  "Returns the name for an index for the given index configuration, column, and index type."
  [index suffix]
  (let [index-name (str (:table-name index) suffix)]
    (hash-identifier-if-exceeds-pg-limit index-name)))

(defn hnsw-index-name
  "Returns the name for a HNSW database index for the given semantic search index configuration."
  [index]
  ;; embedding => embed
  (index-name index "_embed_hnsw_idx"))

(defn fts-index-name
  "Returns the name for a full-text database index for the given semantic search index configuration."
  [index]
  ;; text_search_vector => tsv
  (index-name index "_tsv_gin_idx"))

(defn fts-native-index-name
  "Returns the name for a full-text database index with native queries for the given semantic search index configuration."
  [index]
  ;; text_search_with_native_query_vector => tswnqv
  (index-name index "_tswnqv_gin_idx"))

(defn- content-index-name
  "Returns the name for a B-tree database index on `content` for the given semantic search index configuration.
   Used for efficient `content IN ..` queries."
  [index]
  (index-name index "_content_idx"))

(defn create-index-table-if-not-exists!
  "Ensure that the index table exists and is ready to be populated. If
  force-reset? is true, drops and recreates the table if it exists."
  [connectable index & {:keys [force-reset?] :or {force-reset? false}}]
  (try
    (let [{:keys [embedding-model table-name]} index
          _ (assert (not (re-find #"\s" table-name))
                    (format "whitespace in the table name (%s) is not currently supported" table-name))
          {:keys [vector-dimensions]}          embedding-model]
      (log/info "Creating index table" table-name)
      (jdbc/execute! connectable (sql/format (sql.helpers/create-extension :vector :if-not-exists)))
      (when force-reset? (drop-index-table! connectable index))
      (jdbc/execute!
       connectable
       (-> (sql.helpers/create-table (keyword table-name) :if-not-exists)
           (sql.helpers/with-columns (index-table-schema vector-dimensions))
           sql-format-quoted))
      (jdbc/execute!
       connectable
       (-> (sql.helpers/create-index
            [(keyword (hnsw-index-name index)) :if-not-exists]
            [(keyword table-name) :using-hnsw [[:raw "embedding vector_cosine_ops"]]])
           sql-format-quoted))
      (jdbc/execute!
       connectable
       (-> (sql.helpers/create-index
            [(keyword (fts-index-name index)) :if-not-exists]
            [(keyword table-name) :using-gin :text_search_vector])
           sql-format-quoted))
      (jdbc/execute!
       connectable
       (-> (sql.helpers/create-index
            [(keyword (fts-native-index-name index)) :if-not-exists]
            [(keyword table-name) :using-gin :text_search_with_native_query_vector])
           sql-format-quoted))
      (jdbc/execute!
       connectable
       (-> (sql.helpers/create-index
            [(keyword (content-index-name index)) :if-not-exists]
            [(keyword table-name) :content])
           sql-format-quoted)))
    (catch Exception e
      (throw (ex-info "Failed to create index table" {} e)))))

(comment
  (def embedding-model {:provider "ollama"
                        :model-name "mxbai-embed-large"
                        :vector-dimensions 1024})
  (def index (default-index embedding-model))
  (drop-index-table! db index)
  (create-index-table-if-not-exists! db index)
  (jdbc/execute! db ["select table_name from INFORMATION_SCHEMA.tables where table_name like 'index_table_%'"]))

(defn- search-filters
  "Generate WHERE conditions based on search context filters."
  [{:keys [archived? verified models created-at created-by last-edited-at last-edited-by table-db-id ids display-type]}]
  (let [conditions (filter some?
                           [(when (some? archived?)
                              [:= :archived archived?])
                            (when (some? verified)
                              [:= :verified verified])
                            (when (seq models)
                              [:in :model models])
                            (when (seq created-by)
                              [:in :creator_id created-by])
                            (when (seq last-edited-by)
                              [:in :last_editor_id last-edited-by])
                            (when table-db-id
                              [:= :database_id table-db-id])
                            (when (seq ids)
                              [:in :model_id (map str ids)])
                            (when (seq display-type)
                              [:in :display_type display-type])
                            (when (and created-at (:start created-at) (:end created-at))
                              [:between :model_created_at
                               (LocalDate/parse (:start created-at))
                               (LocalDate/parse (:end created-at))])
                            (when (and last-edited-at (:start last-edited-at) (:end last-edited-at))
                              [:between :model_updated_at
                               (LocalDate/parse (:start last-edited-at))
                               (LocalDate/parse (:end last-edited-at))])])]
    (when (seq conditions)
      (into [:and] conditions))))

(def ^:private common-search-columns
  [[:id :id]
   [:model :model]
   [:model_id :model_id]
   [:collection_id :collection_id]
   [:creator_id :creator_id]
   [:database_id :database_id]
   [:last_editor_id :last_editor_id]
   [:name :name]
   [:content :content]
   [:display_type :display_type]
   [:archived :archived]
   [:official_collection :official_collection]
   [:pinned :pinned]
   [:verified :verified]
   [:dashboardcard_count :dashboardcard_count]
   [:view_count :view_count]
   [:model_created_at :model_created_at]
   [:model_updated_at :model_updated_at]
   [:last_viewed_at :last_viewed_at]
   [:metadata :metadata]])

(defn- keyword-search-query [index search-context]
  (let [filters (search-filters search-context)
        ts-search-expr (search/to-tsquery-expr (:search-string search-context))
        tsv-lang (search/tsv-language)
        vector-column (if (:search-native-query search-context)
                        :text_search_with_native_query_vector
                        :text_search_vector)]
    {:select (into common-search-columns
                   [[[:raw (format "row_number() OVER (ORDER BY ts_rank_cd(%s, query) DESC)" (name vector-column))]
                     :keyword_rank]])
     :from [(keyword (:table-name index))]
     ;; Using a join allows us to share the query expression between our SELECT and WHERE clauses.
     ;; This follows the same secure pattern as metabase.search.appdb.specialization.postgres/base-query
     :join [[[:raw "to_tsquery('" tsv-lang "', " [:lift ts-search-expr] ")"]
             :query] [:= 1 1]]
     :where (let [ts-query-filter [:raw (format "%s @@ query" (name vector-column))]]
              (if (seq filters)
                (into [:and ts-query-filter] [filters])
                ts-query-filter))
     :order-by [[:keyword_rank :asc]]
     :limit (semantic-settings/semantic-search-results-limit)}))

(defn- semantic-search-query
  "Build a semantic search query using vector similarity with post-filtering to enable HNSW index usage."
  [index embedding search-context]
  (let [filters (search-filters search-context)
        embedding-literal (format-embedding embedding)
        max-cosine-distance 0.7
        ;; Inner query: pure vector search to better trigger HNSW index vs. seqscan
        ;; TODO: only pull in necessary extra columns from configured filters
        hnsw-query {:select (into common-search-columns
                                  [[[:raw (str "embedding <=> " embedding-literal)] :distance]])
                    :from   [(keyword (:table-name index))]
                    :order-by [[[:raw (str "embedding <=> " embedding-literal)] :asc]]
                    :limit  (semantic-settings/semantic-search-results-limit)}
        base-query {:with [[:vector_candidates hnsw-query]]
                    :select (into common-search-columns
                                  [[[:raw "row_number() OVER (ORDER BY distance ASC)"] :semantic_rank]
                                   [:distance :semantic_score]])
                    :from [:vector_candidates]
                    :where [:<= :distance max-cosine-distance]
                    :order-by [[:semantic_rank :asc]]}]
    (if filters
      (update base-query :where #(into [:and] [% filters]))
      base-query)))

(defn- hybrid-select
  "For a given `col-name` return a :coalesce expression to reference it from the outer hybrid search query.

   (hybrid-coalesce :model_id) -> [:coalesce :v.model_id :t.model_id]"
  ([col-name-and-alias]
   (hybrid-select col-name-and-alias "v." "t."))
  ([[col-name col-alias] vector-prefix text-prefix]
   (let [prefix #(keyword (str %1 (name %2)))]
     [[:coalesce (prefix vector-prefix col-name) (prefix text-prefix col-name)] col-alias])))

(defn- hybrid-search-query
  "Build a hybrid search query using vector + keyword based searches and reranking with RRF"
  [index embedding search-context]
  (let [semantic-results (semantic-search-query index embedding search-context)
        keyword-results (keyword-search-query index search-context)
        full-query {:with [[:vector_results semantic-results]
                           [:text_results keyword-results]]
                    :select (into
                             (mapv hybrid-select common-search-columns)
                             [[:v.semantic_rank :semantic_rank]
                              [:t.keyword_rank :keyword_rank]])
                    :from [[:vector_results :v]]
                    :full-join [[:text_results :t] [:= :v.id :t.id]]
                    :limit (semantic-settings/semantic-search-results-limit)}]
    full-query))

(defn- scored-search-query
  "Build a hybrid search query with additional `scorers`"
  [index embedding search-context scorers]
  ;; The purpose of this query is just to project the coalesced hybrid columns with standard names so the scorers know
  ;; what to call them (e.g. :model rather than [:coalesced :v.model :t.model]). Likewise, the :search_index alias
  ;; allows us to re-use scoring expressions between the appdb and semantic backends without adjusting column names.
  (let [hybrid-query (hybrid-search-query index embedding search-context)
        full-query {:with [[:hybrid_results hybrid-query]]
                    :select [:id :model_id :model :content :verified :metadata :semantic_rank :keyword_rank]
                    :from [[:hybrid_results :search_index]]
                    :limit (semantic-settings/semantic-search-results-limit)}]
    (scoring/with-scores search-context scorers full-query)))

(defn- legacy-input-with-score
  "Fetches the legacy_input field from a result's metadata and attaches a score based on the
  embedding distance."
  [weights scorers row]
  (-> (get-in row [:metadata :legacy_input])
      (assoc
       :score (:total_score row 1.0)
       :all-scores (scoring/all-scores weights scorers row))))

(defn- decode-metadata
  "Decode `row`s `:metadata`."
  [row]
  (update row :metadata decode-pgobject))

(defn- filter-can-read-indexed-entity
  "Check permissions for indexed entities by resolving to their parent model / card"
  [indexed-entity-docs]
  (let [->model-index-id #(-> % :id search/indexed-entity-id->model-index-id)
        model-index-ids (into #{} (map ->model-index-id indexed-entity-docs))
        model-indexes (when (seq model-index-ids)
                        (t2/select :model/ModelIndex :id [:in model-index-ids]))
        index-id->card (when (seq model-indexes)
                         (let [card-ids    (map :model_id model-indexes)
                               cards       (t2/select :model/Card :id [:in card-ids])
                               cards-by-id (u/index-by :id cards)]
                           (into {}
                                 (map (fn [model-index]
                                        [(:id model-index) (get cards-by-id (:model_id model-index))])
                                      model-indexes))))]
    (filterv (fn [doc]
               (when-let [model-index-id (-> doc ->model-index-id)]
                 (when-let [parent-card (get index-id->card model-index-id)]
                   (mi/can-read? parent-card))))
             indexed-entity-docs)))

(defn- filter-read-permitted
  "Returns only those documents in `docs` whose corresponding t2 instances pass an mi/can-read? check for the bound api user."
  [docs]
  (let [timer (u/start-timer)
        doc->t2-model (fn [doc] (:model (search/spec (:model doc))))
        {indexed-entities true regular-docs false} (group-by #(= "indexed-entity" (:model %)) docs)
        other-docs (for [[t2-model docs] (group-by doc->t2-model regular-docs)
                         t2-instance (t2/select t2-model :id [:in (map :id docs)])]
                     t2-instance)
        permitted-entities (filter-can-read-indexed-entity indexed-entities)
        doc->t2 (comp (u/index-by (juxt :id t2/model) other-docs)
                      (juxt :id doc->t2-model))
        result (into
                (filterv (fn [doc] (some-> doc doc->t2 mi/can-read?)) regular-docs)
                permitted-entities)
        time-ms (u/since-ms timer)]

    (log/debug "Permission filtering" {:before-count (count docs)
                                       :after-count (count result)
                                       :indexed-entities-count (count indexed-entities)
                                       :regular-docs-count (count regular-docs)
                                       :time-ms time-ms})

    (analytics/inc! :metabase-search/semantic-permission-filter-ms time-ms)

    result))

(defn- get-personal-collection-ids
  "Get the set of personal collection IDs by extracting root collection IDs from locations."
  [collections-map]
  (let [root-collection-ids (->> collections-map
                                 vals
                                 (map :location)
                                 (keep (fn [location]
                                         (when-let [match (re-find #"^/(\d+)/" location)]
                                           (parse-long (second match)))))
                                 distinct)]
    (when (seq root-collection-ids)
      (->> (t2/select [:collection :id]
                      :id [:in root-collection-ids]
                      :personal_owner_id [:not= nil])
           (map :id)
           set))))

(defn- filter-by-collection
  "Filter documents based on personal collection preferences.
  Equivalent to metabase.search.filter/personal-collections-where-clause but operates on docs in memory.

  | Filter         | Personal | Others' Personal | Shared Coll. | No Coll. |
  |----------------|----------|------------------|--------------|----------|
  | all            | ✅       | ✅               | ✅           | ✅       |
  | only-mine      | ✅       | ❌               | ❌           | ❌       |
  | only           | ✅       | ✅               | ❌           | ❌       |
  | exclude        | ❌       | ❌               | ✅           | ✅       |
  | exclude-others | ✅       | ❌               | ✅           | ✅       |
  "
  [docs {:keys [filter-items-in-personal-collection current-user-id]}]
  (let [filter-type (or filter-items-in-personal-collection "all")]
    (if (= filter-type "all")
      docs
      (let [collection-ids (keep :collection_id docs)
            collections-map (when (seq collection-ids)
                              (->> (t2/select [:collection :id :location :personal_owner_id]
                                              :id [:in collection-ids])
                                   (into {} (map (juxt :id identity)))))
            personal-collection-ids (when (not= filter-type "only-mine")
                                      (get-personal-collection-ids collections-map))
            user-personal-collection-id (t2/select-one-pk :model/Collection :personal_owner_id [:= current-user-id])
            is-personal-collection?   (fn [coll] (some? (:personal_owner_id coll)))
            is-owned-by-user?         (fn [coll] (= (:personal_owner_id coll) current-user-id))
            is-in-user-personal-tree? (fn [coll] (str/starts-with? (:location coll) (str "/" user-personal-collection-id "/")))
            is-in-any-personal-tree?  (fn [coll]
                                        (when-let [match (re-find #"^/(\d+)/" (:location coll))]
                                          (contains? personal-collection-ids (parse-long (second match)))))]
        (case filter-type
          "only-mine"
          (filterv (fn [doc]
                     (when-let [collection (get collections-map (:collection_id doc))]
                       (or (is-owned-by-user? collection)
                           (is-in-user-personal-tree? collection))))
                   docs)

          "only"
          (filterv (fn [doc]
                     (when-let [collection (get collections-map (:collection_id doc))]
                       (or (is-personal-collection? collection)
                           (is-in-any-personal-tree? collection))))
                   docs)

          "exclude"
          (filterv (fn [doc]
                     (let [collection-id (:collection_id doc)]
                       (or (nil? collection-id)
                           (when-let [collection (get collections-map collection-id)]
                             (and (not (is-personal-collection? collection))
                                  (not (is-in-any-personal-tree? collection)))))))
                   docs)

          "exclude-others"
          (filterv (fn [doc]
                     (let [collection-id (:collection_id doc)]
                       (or (nil? collection-id)
                           (when-let [collection (get collections-map collection-id)]
                             (or (is-owned-by-user? collection)
                                 (is-in-user-personal-tree? collection)
                                 (and (not (is-personal-collection? collection))
                                      (not (is-in-any-personal-tree? collection))))))))
                   docs)

          docs)))))

(defn- filter-by-collection-id
  "Filter documents by collection and all descendant collections."
  [docs collection-id]
  (let [collection-ids (keep :collection_id docs)
        collections-map (when (seq collection-ids)
                          (->> (t2/select [:collection :id :location]
                                          :id [:in collection-ids])
                               (into {} (map (juxt :id identity)))))]
    (filterv (fn [doc]
               (let [doc-collection-id (:collection_id doc)]
                 (or (= doc-collection-id collection-id)
                     (when doc-collection-id
                       (when-let [collection (get collections-map doc-collection-id)]
                         (str/starts-with? (:location collection) (str "/" collection-id "/")))))))
             docs)))

(defn- apply-collection-filter
  "Apply personal collection filtering with logging."
  [search-context docs]
  (let [filter-type (:filter-items-in-personal-collection search-context)]
    (if (or (nil? filter-type) (= filter-type "all"))
      docs
      (let [timer (u/start-timer)
            filtered-docs (filter-by-collection docs search-context)
            time-ms (u/since-ms timer)]
        (log/debug "Collection filter" {:filter  filter-type
                                        :before  (count docs)
                                        :after   (count filtered-docs)
                                        :dropped (- (count docs) (count filtered-docs))
                                        :time_ms time-ms})
        (analytics/inc! :metabase-search/semantic-collection-filter-ms time-ms)
        filtered-docs))))

(defn- apply-collection-id-filter
  "Apply collection ID filtering with logging."
  [search-context docs]
  (let [collection-id (:collection search-context)]
    (if (nil? collection-id)
      docs
      (let [timer (u/start-timer)
            filtered-docs (filter-by-collection-id docs collection-id)
            time-ms (u/since-ms timer)]
        (log/debug "Collection filter" {:collection-id collection-id
                                        :before        (count docs)
                                        :after         (count filtered-docs)
                                        :dropped       (- (count docs) (count filtered-docs))
                                        :time_ms       time-ms})
        (analytics/inc! :metabase-search/semantic-collection-id-filter-ms time-ms)
        filtered-docs))))

(defn- reducible-search-query
  "Extracted so can be redefd in tests."
  [db query]
  (jdbc/plan db (sql-format-quoted query) {:builder-fn jdbc.rs/as-unqualified-lower-maps}))

(defn query-index
  "Query the index for documents similar to the search string.
  Returns a map with :results and :raw-count."
  [db index search-context]
  (let [{:keys [embedding-model]} index
        search-string (:search-string search-context)]
    (if (str/blank? search-string)
      {:results [] :raw-count 0}
      (let [timer (u/start-timer)

            embedding (embedding/get-embedding embedding-model search-string {:type :query})
            embedding-time-ms (u/since-ms timer)

            db-timer (u/start-timer)
            weights (search.config/weights search-context)
            scorers (scoring/semantic-scorers (:table-name index) search-context)
            query (scored-search-query index embedding search-context scorers)
            xform (comp (map decode-metadata)
                        (map (partial legacy-input-with-score weights (keys scorers))))
            reducible (reducible-search-query db query)
            raw-results (into [] xform reducible)
            db-query-time-ms (u/since-ms db-timer)

            filter-timer (u/start-timer)
            filtered-results (->> raw-results
                                  filter-read-permitted
                                  (apply-collection-filter search-context)
                                  (apply-collection-id-filter search-context)
                                  (mapv search/collapse-id))
            filter-time-ms (u/since-ms filter-timer)

            appdb-scorers (scoring/appdb-scorers search-context)
            appdb-scores-timer (u/start-timer)
            final-results (->> filtered-results
                               (scoring/with-appdb-scores search-context appdb-scorers weights))
            appdb-scores-time-ms (u/since-ms appdb-scores-timer)
            total-time-ms (u/since-ms timer)]

        (log/debug "Semantic search"
                   {:search-string-length (count search-string)
                    :raw-results-count (count raw-results)
                    :final-results-count (count final-results)
                    :embedding-time-ms embedding-time-ms
                    :db-query-time-ms db-query-time-ms
                    :filter-time-ms filter-time-ms
                    :appdb-scores-time-ms appdb-scores-time-ms
                    :total-time-ms total-time-ms})

        (analytics/inc! :metabase-search/semantic-embedding-ms
                        {:embedding-model (:name embedding-model)}
                        embedding-time-ms)
        (analytics/inc! :metabase-search/semantic-db-query-ms
                        {:embedding-model (:name embedding-model)}
                        db-query-time-ms)
        (analytics/inc! :metabase-search/semantic-appdb-scores-ms
                        appdb-scores-time-ms)
        (analytics/inc! :metabase-search/semantic-search-ms
                        {:embedding-model (:name embedding-model)}
                        total-time-ms)

        (comment
          (jdbc/execute! db (sql-format-quoted query)))

        {:results final-results
         :raw-count (count raw-results)}))))

(comment
  (def embedding-model (embedding/get-configured-model))
  (def index (default-index embedding-model))
  (def search-ctx {:search-string "pasta"})
  (def embed (embedding/get-embedding embedding-model (:search-string search-ctx)))
  (def scorers (scoring/semantic-scorers (:table-name index) search-ctx))

  (keyword-search-query index search-ctx)
  (sql-format-quoted (keyword-search-query index search-ctx))
  (jdbc/execute! db (sql-format-quoted (keyword-search-query index search-ctx)))

  (semantic-search-query index embed search-ctx)
  (sql/format (semantic-search-query index embed search-ctx))
  (jdbc/execute! db (sql-format-quoted (semantic-search-query index embed search-ctx)))

  (hybrid-search-query index embed search-ctx)
  (sql-format-quoted (hybrid-search-query index embed search-ctx))
  (jdbc/execute! db (sql-format-quoted (hybrid-search-query index embed search-ctx)))

  (scored-search-query index embed search-ctx scorers)
  (sql-format-quoted (scored-search-query index embed search-ctx scorers))
  (jdbc/execute! db (sql-format-quoted (scored-search-query index embed search-ctx scorers)))

  (query-index db index search-ctx))

(defn- delete-from-index-batch-sql
  [model table-name batch-ids]
  (when (seq batch-ids)
    (-> (sql.helpers/delete-from table-name)
        (sql.helpers/where [:and
                            [:= :model model]
                            [:in :model_id (map str batch-ids)]])
        sql-format-quoted)))

(defn delete-from-index!
  "Deletes documents from the index table based on model and model_ids."
  [db index model model-ids]
  (batch-delete-ids!
   db
   (:table-name index)
   model
   (partial delete-from-index-batch-sql model (keyword (:table-name index)))
   model-ids))

(comment
  (def pgvector ((requiring-resolve 'metabase-enterprise.semantic-search.env/get-pgvector-datasource!)))
  (def embedding-model (embedding/get-configured-model))
  (def index (default-index embedding-model))
  (create-index-table-if-not-exists! pgvector index {:force-reset? true})
  (upsert-index! db index [{:model "card"
                            :id "1"
                            :searchable_text "This is a test card"}])
  (delete-from-index! db index "card" ["1"])
  (delete-from-index! db index "dashboard" ["13"])
  ;; no user
  (query-index db index {:search-string "Copper knife"})

  #_:clj-kondo/ignore
  (require '[metabase.test :as mt])
  (mt/with-test-user :crowberto
    (doall (query-index db index {:search-string "Copper knife"}))))

(comment
  ;; Query performance analysis with EXPLAIN ANALYZE
  ;; Run this to debug query performance and execution plans
  (defn explain-analyze-query
    [db sql-with-params]
    (let [[sql & params] sql-with-params
          explain-sql (str "EXPLAIN (ANALYZE true, BUFFERS true, FORMAT text) " sql)]
      (jdbc/execute! db (into [explain-sql] params))))

  ((requiring-resolve 'metabase-enterprise.semantic-search.db.datasource/init-db!))
  (def db @@(requiring-resolve 'metabase-enterprise.semantic-search.db.datasource/data-source))
  (jdbc/execute! db ["SHOW random_page_cost;"])      ; makes index scans appear cheaper
  (jdbc/execute! db ["SHOW seq_page_cost;"])         ; default 1
  (jdbc/execute! db ["SHOW hnsw.ef_search;"])
  (jdbc/execute! db ["SET random_page_cost = 1.0;"]) ; Default 4, should set to seq_page_cost (default 1)
  (jdbc/execute! db ["SET enable_seqscan = ON;"])    ; ON|OFF - Disable seq scan to force index scan

  (def embedding-model (embedding/get-configured-model))
  (def index (default-index embedding-model))
  (def search-string "product orders")
  (def search-context {:search-string search-string
                       :models ["card" "dashboard"]
                       :archived? false
                       :verified nil})
  (def search-context {:search-string search-string})

  (def embedding (embedding/get-embedding (:embedding-model index) search-string))
  (def scorers (scoring/semantic-scorers (:table-name index) search-context))

  ;; Format queries for execution
  (def semantic-sql (sql-format-quoted (semantic-search-query index embedding search-context)))
  (def keyword-sql (sql-format-quoted (keyword-search-query index search-context)))
  (def hybrid-sql (sql-format-quoted (hybrid-search-query index embedding search-context)))
  (def scored-sql (sql-format-quoted (scored-search-query index embedding search-context scorers)))

  ;; do in repl ->
  #_(explain-analyze-query db semantic-sql)
  #_(explain-analyze-query db keyword-sql)
  #_(explain-analyze-query db hybrid-sql)
  #_(explain-analyze-query db scored-sql)

  (def existing-sql (sql-format-quoted (existing-embedding-query index ["Some Text"])))
  #_(explain-analyze-query db existing-sql)

  ;; Code to test the custom thread pool. The transduction should process batches in parallel and new batches should
  ;; only be realized in memory once a thread is available.
  (defn process-batch [batch]
    #_:clj-kondo/ignore
    (println "Processing batch starting with " (first batch))
    (Thread/sleep 2000)
    {:count (count batch)})

  (defn logging-range [n]
    #_:clj-kondo/ignore
    (map #(do (println "Realizing item" %) %)
         (range n)))

  (defn reducible [n]
    (u/rconcat (logging-range n) []))

  (let [pool @index-update-executor
        futures
        (transduce
         (comp
          (partition-all 10)
          (map #(cp/future pool (process-batch %))))
         conj
         (reducible 100))]
    (reduce (fn [acc fut]
              (merge-with + acc @fut))
            {}
            futures)))
